# uncompyle6 version 3.2.3
# Python bytecode 3.6 (3379)
# Decompiled from: Python 3.6.8 |Anaconda custom (64-bit)| (default, Feb 21 2019, 18:30:04) [MSC v.1916 64 bit (AMD64)]
# Embedded file name: asyncio\unix_events.py
"""Selector event loop for Unix with signal handling."""
import errno, os, signal, socket, stat, subprocess, sys, threading, warnings
from . import base_events
from . import base_subprocess
from . import compat
from . import constants
from . import coroutines
from . import events
from . import futures
from . import selector_events
from . import selectors
from . import transports
from .coroutines import coroutine
from .log import logger

__all__ = [
    "SelectorEventLoop",
    "AbstractChildWatcher",
    "SafeChildWatcher",
    "FastChildWatcher",
    "DefaultEventLoopPolicy",
]
if sys.platform == "win32":
    raise ImportError("Signals are not really supported on Windows")


def _sighandler_noop(signum, frame):
    """Dummy signal handler."""
    pass


try:
    _fspath = os.fspath
except AttributeError:
    _fspath = lambda path: path


class _UnixSelectorEventLoop(selector_events.BaseSelectorEventLoop):
    """Unix event loop.
    
    Adds signal handling and UNIX Domain Socket support to SelectorEventLoop.
    """

    def __init__(self, selector=None):
        super().__init__(selector)
        self._signal_handlers = {}

    def _socketpair(self):
        return socket.socketpair()

    def close(self):
        super().close()
        for sig in list(self._signal_handlers):
            self.remove_signal_handler(sig)

    def _process_self_data(self, data):
        for signum in data:
            if not signum:
                continue
            self._handle_signal(signum)

    def add_signal_handler(self, sig, callback, *args):
        """Add a handler for a signal.  UNIX only.
        
        Raise ValueError if the signal number is invalid or uncatchable.
        Raise RuntimeError if there is a problem setting up the handler.
        """
        if coroutines.iscoroutine(callback) or coroutines.iscoroutinefunction(callback):
            raise TypeError("coroutines cannot be used with add_signal_handler()")
        self._check_signal(sig)
        self._check_closed()
        try:
            signal.set_wakeup_fd(self._csock.fileno())
        except (ValueError, OSError) as exc:
            raise RuntimeError(str(exc))

        handle = events.Handle(callback, args, self)
        self._signal_handlers[sig] = handle
        try:
            signal.signal(sig, _sighandler_noop)
            signal.siginterrupt(sig, False)
        except OSError as exc:
            del self._signal_handlers[sig]
            if not self._signal_handlers:
                try:
                    signal.set_wakeup_fd(-1)
                except (ValueError, OSError) as nexc:
                    logger.info("set_wakeup_fd(-1) failed: %s", nexc)

                if exc.errno == errno.EINVAL:
                    raise RuntimeError(("sig {} cannot be caught").format(sig))
                else:
                    raise

    def _handle_signal(self, sig):
        """Internal helper that is the actual signal handler."""
        handle = self._signal_handlers.get(sig)
        if handle is None:
            return
        if handle._cancelled:
            self.remove_signal_handler(sig)
        else:
            self._add_callback_signalsafe(handle)

    def remove_signal_handler(self, sig):
        """Remove a handler for a signal.  UNIX only.
        
        Return True if a signal handler was removed, False if not.
        """
        self._check_signal(sig)
        try:
            del self._signal_handlers[sig]
        except KeyError:
            return False
        else:
            if sig == signal.SIGINT:
                handler = signal.default_int_handler
            else:
                handler = signal.SIG_DFL

        try:
            signal.signal(sig, handler)
        except OSError as exc:
            if exc.errno == errno.EINVAL:
                raise RuntimeError(("sig {} cannot be caught").format(sig))
            else:
                raise

        if not self._signal_handlers:
            try:
                signal.set_wakeup_fd(-1)
            except (ValueError, OSError) as exc:
                logger.info("set_wakeup_fd(-1) failed: %s", exc)

            return True

    def _check_signal(self, sig):
        """Internal helper to validate a signal.
        
        Raise ValueError if the signal number is invalid or uncatchable.
        Raise RuntimeError if there is a problem setting up the handler.
        """
        if not isinstance(sig, int):
            raise TypeError(("sig must be an int, not {!r}").format(sig))
        if not 1 <= sig < signal.NSIG:
            raise ValueError(("sig {} out of range(1, {})").format(sig, signal.NSIG))

    def _make_read_pipe_transport(self, pipe, protocol, waiter=None, extra=None):
        return _UnixReadPipeTransport(self, pipe, protocol, waiter, extra)

    def _make_write_pipe_transport(self, pipe, protocol, waiter=None, extra=None):
        return _UnixWritePipeTransport(self, pipe, protocol, waiter, extra)

    @coroutine
    def _make_subprocess_transport(
        self,
        protocol,
        args,
        shell,
        stdin,
        stdout,
        stderr,
        bufsize,
        extra=None,
        **kwargs
    ):
        with events.get_child_watcher() as (watcher):
            waiter = self.create_future()
            transp = _UnixSubprocessTransport(waiter=waiter, extra=extra, **kwargs)
            watcher.add_child_handler(
                transp.get_pid(), self._child_watcher_callback, transp
            )
            try:
                yield from waiter
            except Exception as exc:
                err = exc
            else:
                err = None

            if err is not None:
                transp.close()
                yield from transp._wait()
                raise err
        return transp

    def _child_watcher_callback(self, pid, returncode, transp):
        self.call_soon_threadsafe(transp._process_exited, returncode)

    @coroutine
    def create_unix_connection(
        self, protocol_factory, path, *, ssl=None, sock=None, server_hostname=None
    ):
        if not server_hostname is None:
            if not isinstance(server_hostname, str):
                raise AssertionError
        if ssl:
            if server_hostname is None:
                raise ValueError("you have to pass server_hostname when using ssl")
            else:
                if server_hostname is not None:
                    raise ValueError("server_hostname is only meaningful with ssl")
            if path is not None:
                if sock is not None:
                    raise ValueError(
                        "path and sock can not be specified at the same time"
                    )
                sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM, 0)
                try:
                    sock.setblocking(False)
                    yield from self.sock_connect(sock, path)
                except:
                    sock.close()
                    raise

            else:
                if sock is None:
                    raise ValueError("no path and sock were specified")
                if sock.family != socket.AF_UNIX or not base_events._is_stream_socket(
                    sock
                ):
                    raise ValueError(
                        ("A UNIX Domain Stream Socket was expected, got {!r}").format(
                            sock
                        )
                    )
                sock.setblocking(False)
            transport, protocol = yield from self._create_connection_transport(
                sock, protocol_factory, ssl, server_hostname
            )
            return (transport, protocol)

    @coroutine
    def create_unix_server(
        self, protocol_factory, path=None, *, sock=None, backlog=100, ssl=None
    ):
        if isinstance(ssl, bool):
            raise TypeError("ssl argument must be an SSLContext or None")
        if path is not None:
            if sock is not None:
                raise ValueError("path and sock can not be specified at the same time")
            path = _fspath(path)
            sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            if path[0] not in (0, "\x00"):
                try:
                    if stat.S_ISSOCK(os.stat(path).st_mode):
                        os.remove(path)
                except FileNotFoundError:
                    pass
                except OSError as err:
                    logger.error(
                        "Unable to check or remove stale UNIX socket %r: %r", path, err
                    )

                try:
                    sock.bind(path)
                except OSError as exc:
                    sock.close()
                    if exc.errno == errno.EADDRINUSE:
                        msg = ("Address {!r} is already in use").format(path)
                        raise OSError(errno.EADDRINUSE, msg) from None
                    else:
                        raise
                except:
                    sock.close()
                    raise

            else:
                if sock is None:
                    raise ValueError("path was not specified, and no sock specified")
                if sock.family != socket.AF_UNIX or not base_events._is_stream_socket(
                    sock
                ):
                    raise ValueError(
                        ("A UNIX Domain Stream Socket was expected, got {!r}").format(
                            sock
                        )
                    )
            server = base_events.Server(self, [sock])
            sock.listen(backlog)
            sock.setblocking(False)
            self._start_serving(protocol_factory, sock, ssl, server)
            return server


if hasattr(os, "set_blocking"):

    def _set_nonblocking(fd):
        os.set_blocking(fd, False)


else:
    import fcntl

    def _set_nonblocking(fd):
        flags = fcntl.fcntl(fd, fcntl.F_GETFL)
        flags = flags | os.O_NONBLOCK
        fcntl.fcntl(fd, fcntl.F_SETFL, flags)


class _UnixReadPipeTransport(transports.ReadTransport):
    max_size = 262144

    def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
        super().__init__(extra)
        self._extra["pipe"] = pipe
        self._loop = loop
        self._pipe = pipe
        self._fileno = pipe.fileno()
        self._protocol = protocol
        self._closing = False
        mode = os.fstat(self._fileno).st_mode
        if not (stat.S_ISFIFO(mode) or stat.S_ISSOCK(mode) or stat.S_ISCHR(mode)):
            self._pipe = None
            self._fileno = None
            self._protocol = None
            raise ValueError("Pipe transport is for pipes/sockets only.")
        _set_nonblocking(self._fileno)
        self._loop.call_soon(self._protocol.connection_made, self)
        self._loop.call_soon(self._loop._add_reader, self._fileno, self._read_ready)
        if waiter is not None:
            self._loop.call_soon(futures._set_result_unless_cancelled, waiter, None)

    def __repr__(self):
        info = [self.__class__.__name__]
        if self._pipe is None:
            info.append("closed")
        else:
            if self._closing:
                info.append("closing")
        info.append("fd=%s" % self._fileno)
        selector = getattr(self._loop, "_selector", None)
        if self._pipe is not None:
            if selector is not None:
                polling = selector_events._test_selector_event(
                    selector, self._fileno, selectors.EVENT_READ
                )
                if polling:
                    info.append("polling")
                else:
                    info.append("idle")
            if self._pipe is not None:
                info.append("open")
            else:
                info.append("closed")
            return "<%s>" % (" ").join(info)

    def _read_ready(self):
        try:
            data = os.read(self._fileno, self.max_size)
        except (BlockingIOError, InterruptedError):
            pass
        except OSError as exc:
            self._fatal_error(exc, "Fatal read error on pipe transport")
        else:
            if data:
                self._protocol.data_received(data)
            else:
                if self._loop.get_debug():
                    logger.info("%r was closed by peer", self)
                self._closing = True
                self._loop._remove_reader(self._fileno)
                self._loop.call_soon(self._protocol.eof_received)
                self._loop.call_soon(self._call_connection_lost, None)

    def pause_reading(self):
        self._loop._remove_reader(self._fileno)

    def resume_reading(self):
        self._loop._add_reader(self._fileno, self._read_ready)

    def set_protocol(self, protocol):
        self._protocol = protocol

    def get_protocol(self):
        return self._protocol

    def is_closing(self):
        return self._closing

    def close(self):
        if not self._closing:
            self._close(None)

    if compat.PY34:

        def __del__(self):
            if self._pipe is not None:
                warnings.warn(
                    "unclosed transport %r" % self, ResourceWarning, source=self
                )
                self._pipe.close()

    def _fatal_error(self, exc, message="Fatal error on pipe transport"):
        if isinstance(exc, OSError):
            if exc.errno == errno.EIO:
                if self._loop.get_debug():
                    logger.debug("%r: %s", self, message, exc_info=True)
                self._loop.call_exception_handler(
                    {
                        "message": message,
                        "exception": exc,
                        "transport": self,
                        "protocol": self._protocol,
                    }
                )
        self._close(exc)

    def _close(self, exc):
        self._closing = True
        self._loop._remove_reader(self._fileno)
        self._loop.call_soon(self._call_connection_lost, exc)

    def _call_connection_lost(self, exc):
        try:
            self._protocol.connection_lost(exc)
        finally:
            self._pipe.close()
            self._pipe = None
            self._protocol = None
            self._loop = None


class _UnixWritePipeTransport(transports._FlowControlMixin, transports.WriteTransport):
    def __init__(self, loop, pipe, protocol, waiter=None, extra=None):
        super().__init__(extra, loop)
        self._extra["pipe"] = pipe
        self._pipe = pipe
        self._fileno = pipe.fileno()
        self._protocol = protocol
        self._buffer = bytearray()
        self._conn_lost = 0
        self._closing = False
        mode = os.fstat(self._fileno).st_mode
        is_char = stat.S_ISCHR(mode)
        is_fifo = stat.S_ISFIFO(mode)
        is_socket = stat.S_ISSOCK(mode)
        if not (is_char or is_fifo or is_socket):
            self._pipe = None
            self._fileno = None
            self._protocol = None
            raise ValueError(
                "Pipe transport is only for pipes, sockets and character devices"
            )
        _set_nonblocking(self._fileno)
        self._loop.call_soon(self._protocol.connection_made, self)
        if is_socket or is_fifo and not sys.platform.startswith("aix"):
            self._loop.call_soon(self._loop._add_reader, self._fileno, self._read_ready)
        if waiter is not None:
            self._loop.call_soon(futures._set_result_unless_cancelled, waiter, None)

    def __repr__(self):
        info = [self.__class__.__name__]
        if self._pipe is None:
            info.append("closed")
        else:
            if self._closing:
                info.append("closing")
        info.append("fd=%s" % self._fileno)
        selector = getattr(self._loop, "_selector", None)
        if self._pipe is not None:
            if selector is not None:
                polling = selector_events._test_selector_event(
                    selector, self._fileno, selectors.EVENT_WRITE
                )
                if polling:
                    info.append("polling")
                else:
                    info.append("idle")
                bufsize = self.get_write_buffer_size()
                info.append("bufsize=%s" % bufsize)
            if self._pipe is not None:
                info.append("open")
            else:
                info.append("closed")
            return "<%s>" % (" ").join(info)

    def get_write_buffer_size(self):
        return len(self._buffer)

    def _read_ready(self):
        if self._loop.get_debug():
            logger.info("%r was closed by peer", self)
        if self._buffer:
            self._close(BrokenPipeError())
        else:
            self._close()

    def write(self, data):
        if not isinstance(data, (bytes, bytearray, memoryview)):
            raise AssertionError(repr(data))
        if isinstance(data, bytearray):
            data = memoryview(data)
        if not data:
            return
        if self._conn_lost or self._closing:
            if self._conn_lost >= constants.LOG_THRESHOLD_FOR_CONNLOST_WRITES:
                logger.warning(
                    "pipe closed by peer or os.write(pipe, data) raised exception."
                )
            self._conn_lost += 1
            return
        if not self._buffer:
            try:
                n = os.write(self._fileno, data)
            except (BlockingIOError, InterruptedError):
                n = 0
            except Exception as exc:
                self._conn_lost += 1
                self._fatal_error(exc, "Fatal write error on pipe transport")
                return

            if n == len(data):
                return
            if n > 0:
                data = memoryview(data)[n:]
            self._loop._add_writer(self._fileno, self._write_ready)
        self._buffer += data
        self._maybe_pause_protocol()

    def _write_ready(self):
        if not self._buffer:
            raise AssertionError("Data should not be empty")
        try:
            n = os.write(self._fileno, self._buffer)
        except (BlockingIOError, InterruptedError):
            pass
        except Exception as exc:
            self._buffer.clear()
            self._conn_lost += 1
            self._loop._remove_writer(self._fileno)
            self._fatal_error(exc, "Fatal write error on pipe transport")
        else:
            if n == len(self._buffer):
                self._buffer.clear()
                self._loop._remove_writer(self._fileno)
                self._maybe_resume_protocol()
                if self._closing:
                    self._loop._remove_reader(self._fileno)
                    self._call_connection_lost(None)
                return
            if n > 0:
                del self._buffer[:n]

    def can_write_eof(self):
        return True

    def write_eof(self):
        if self._closing:
            return
        if not self._pipe:
            raise AssertionError
        self._closing = True
        if not self._buffer:
            self._loop._remove_reader(self._fileno)
            self._loop.call_soon(self._call_connection_lost, None)

    def set_protocol(self, protocol):
        self._protocol = protocol

    def get_protocol(self):
        return self._protocol

    def is_closing(self):
        return self._closing

    def close(self):
        if self._pipe is not None:
            if not self._closing:
                self.write_eof()

    if compat.PY34:

        def __del__(self):
            if self._pipe is not None:
                warnings.warn(
                    "unclosed transport %r" % self, ResourceWarning, source=self
                )
                self._pipe.close()

    def abort(self):
        self._close(None)

    def _fatal_error(self, exc, message="Fatal error on pipe transport"):
        if isinstance(exc, base_events._FATAL_ERROR_IGNORE):
            if self._loop.get_debug():
                logger.debug("%r: %s", self, message, exc_info=True)
            else:
                self._loop.call_exception_handler(
                    {
                        "message": message,
                        "exception": exc,
                        "transport": self,
                        "protocol": self._protocol,
                    }
                )
            self._close(exc)

    def _close(self, exc=None):
        self._closing = True
        if self._buffer:
            self._loop._remove_writer(self._fileno)
        self._buffer.clear()
        self._loop._remove_reader(self._fileno)
        self._loop.call_soon(self._call_connection_lost, exc)

    def _call_connection_lost(self, exc):
        try:
            self._protocol.connection_lost(exc)
        finally:
            self._pipe.close()
            self._pipe = None
            self._protocol = None
            self._loop = None


if hasattr(os, "set_inheritable"):
    _set_inheritable = os.set_inheritable
else:
    import fcntl

    def _set_inheritable(fd, inheritable):
        cloexec_flag = getattr(fcntl, "FD_CLOEXEC", 1)
        old = fcntl.fcntl(fd, fcntl.F_GETFD)
        if not inheritable:
            fcntl.fcntl(fd, fcntl.F_SETFD, old | cloexec_flag)
        else:
            fcntl.fcntl(fd, fcntl.F_SETFD, old & ~cloexec_flag)


class _UnixSubprocessTransport(base_subprocess.BaseSubprocessTransport):
    def _start(self, args, shell, stdin, stdout, stderr, bufsize, **kwargs):
        stdin_w = None
        if stdin == subprocess.PIPE:
            stdin, stdin_w = self._loop._socketpair()
            _set_inheritable(stdin_w.fileno(), False)
        self._proc = subprocess.Popen(
            shell=shell,
            stdin=stdin,
            stdout=stdout,
            stderr=stderr,
            universal_newlines=False,
            bufsize=bufsize,
            **kwargs
        )
        if stdin_w is not None:
            stdin.close()
            self._proc.stdin = open(stdin_w.detach(), "wb", buffering=bufsize)


class AbstractChildWatcher:
    """Abstract base class for monitoring child processes.
    
    Objects derived from this class monitor a collection of subprocesses and
    report their termination or interruption by a signal.
    
    New callbacks are registered with .add_child_handler(). Starting a new
    process must be done within a 'with' block to allow the watcher to suspend
    its activity until the new process if fully registered (this is needed to
    prevent a race condition in some implementations).
    
    Example:
        with watcher:
            proc = subprocess.Popen("sleep 1")
            watcher.add_child_handler(proc.pid, callback)
    
    Notes:
        Implementations of this class must be thread-safe.
    
        Since child watcher objects may catch the SIGCHLD signal and call
        waitpid(-1), there should be only one active object per process.
    """

    def add_child_handler(self, pid, callback, *args):
        """Register a new child handler.
        
        Arrange for callback(pid, returncode, *args) to be called when
        process 'pid' terminates. Specifying another callback for the same
        process replaces the previous handler.
        
        Note: callback() must be thread-safe.
        """
        raise NotImplementedError()

    def remove_child_handler(self, pid):
        """Removes the handler for process 'pid'.
        
        The function returns True if the handler was successfully removed,
        False if there was nothing to remove."""
        raise NotImplementedError()

    def attach_loop(self, loop):
        """Attach the watcher to an event loop.
        
        If the watcher was previously attached to an event loop, then it is
        first detached before attaching to the new loop.
        
        Note: loop may be None.
        """
        raise NotImplementedError()

    def close(self):
        """Close the watcher.
        
        This must be called to make sure that any underlying resource is freed.
        """
        raise NotImplementedError()

    def __enter__(self):
        """Enter the watcher's context and allow starting new processes
        
        This function must return self"""
        raise NotImplementedError()

    def __exit__(self, a, b, c):
        """Exit the watcher's context"""
        raise NotImplementedError()


class BaseChildWatcher(AbstractChildWatcher):
    def __init__(self):
        self._loop = None
        self._callbacks = {}

    def close(self):
        self.attach_loop(None)

    def _do_waitpid(self, expected_pid):
        raise NotImplementedError()

    def _do_waitpid_all(self):
        raise NotImplementedError()

    def attach_loop(self, loop):
        if not loop is None:
            if not isinstance(loop, events.AbstractEventLoop):
                raise AssertionError
        if self._loop is not None:
            if loop is None:
                if self._callbacks:
                    warnings.warn(
                        "A loop is being detached from a child watcher with pending handlers",
                        RuntimeWarning,
                    )
        if self._loop is not None:
            self._loop.remove_signal_handler(signal.SIGCHLD)
        self._loop = loop
        if loop is not None:
            loop.add_signal_handler(signal.SIGCHLD, self._sig_chld)
            self._do_waitpid_all()

    def _sig_chld(self):
        try:
            self._do_waitpid_all()
        except Exception as exc:
            self._loop.call_exception_handler(
                {"message": "Unknown exception in SIGCHLD handler", "exception": exc}
            )

    def _compute_returncode(self, status):
        if os.WIFSIGNALED(status):
            return -os.WTERMSIG(status)
        elif os.WIFEXITED(status):
            return os.WEXITSTATUS(status)
        else:
            return status


class SafeChildWatcher(BaseChildWatcher):
    """'Safe' child watcher implementation.
    
    This implementation avoids disrupting other code spawning processes by
    polling explicitly each process in the SIGCHLD handler instead of calling
    os.waitpid(-1).
    
    This is a safe solution but it has a significant overhead when handling a
    big number of children (O(n) each time SIGCHLD is raised)
    """

    def close(self):
        self._callbacks.clear()
        super().close()

    def __enter__(self):
        return self

    def __exit__(self, a, b, c):
        pass

    def add_child_handler(self, pid, callback, *args):
        if self._loop is None:
            raise RuntimeError(
                "Cannot add child handler, the child watcher does not have a loop attached"
            )
        self._callbacks[pid] = (callback, args)
        self._do_waitpid(pid)

    def remove_child_handler(self, pid):
        try:
            del self._callbacks[pid]
            return True
        except KeyError:
            return False

    def _do_waitpid_all(self):
        for pid in list(self._callbacks):
            self._do_waitpid(pid)

    def _do_waitpid(self, expected_pid):
        if not expected_pid > 0:
            raise AssertionError
        try:
            pid, status = os.waitpid(expected_pid, os.WNOHANG)
        except ChildProcessError:
            pid = expected_pid
            returncode = 255
            logger.warning(
                "Unknown child process pid %d, will report returncode 255", pid
            )
        else:
            if pid == 0:
                return
            returncode = self._compute_returncode(status)
            if self._loop.get_debug():
                logger.debug(
                    "process %s exited with returncode %s", expected_pid, returncode
                )

        try:
            callback, args = self._callbacks.pop(pid)
        except KeyError:
            if self._loop.get_debug():
                logger.warning(
                    "Child watcher got an unexpected pid: %r", pid, exc_info=True
                )
        else:
            callback(pid, returncode, *args)


class FastChildWatcher(BaseChildWatcher):
    """'Fast' child watcher implementation.
    
    This implementation reaps every terminated processes by calling
    os.waitpid(-1) directly, possibly breaking other code spawning processes
    and waiting for their termination.
    
    There is no noticeable overhead when handling a big number of children
    (O(1) each time a child terminates).
    """

    def __init__(self):
        super().__init__()
        self._lock = threading.Lock()
        self._zombies = {}
        self._forks = 0

    def close(self):
        self._callbacks.clear()
        self._zombies.clear()
        super().close()

    def __enter__(self):
        with self._lock:
            self._forks += 1
            return self

    def __exit__(self, a, b, c):
        with self._lock:
            self._forks -= 1
            if self._forks or not self._zombies:
                return
            collateral_victims = str(self._zombies)
            self._zombies.clear()
        logger.warning(
            "Caught subprocesses termination from unknown pids: %s", collateral_victims
        )

    def add_child_handler(self, pid, callback, *args):
        if not self._forks:
            raise AssertionError("Must use the context manager")
        if self._loop is None:
            raise RuntimeError(
                "Cannot add child handler, the child watcher does not have a loop attached"
            )
        with self._lock:
            try:
                returncode = self._zombies.pop(pid)
            except KeyError:
                self._callbacks[pid] = (callback, args)
                return

        callback(pid, returncode, *args)

    def remove_child_handler(self, pid):
        try:
            del self._callbacks[pid]
            return True
        except KeyError:
            return False

    def _do_waitpid_all(self):
        while True:
            try:
                pid, status = os.waitpid(-1, os.WNOHANG)
            except ChildProcessError:
                return

            if pid == 0:
                return
            returncode = self._compute_returncode(status)
            with self._lock:
                try:
                    callback, args = self._callbacks.pop(pid)
                except KeyError:
                    if self._forks:
                        self._zombies[pid] = returncode
                        if self._loop.get_debug():
                            logger.debug(
                                "unknown process %s exited with returncode %s",
                                pid,
                                returncode,
                            )
                        continue
                    callback = None
                else:
                    if self._loop.get_debug():
                        logger.debug(
                            "process %s exited with returncode %s", pid, returncode
                        )

            if callback is None:
                logger.warning(
                    "Caught subprocess termination from unknown pid: %d -> %d",
                    pid,
                    returncode,
                )
            else:
                callback(pid, returncode, *args)


class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
    """UNIX event loop policy with a watcher for child processes."""

    _loop_factory = _UnixSelectorEventLoop

    def __init__(self):
        super().__init__()
        self._watcher = None

    def _init_watcher(self):
        with events._lock:
            if self._watcher is None:
                self._watcher = SafeChildWatcher()
                if isinstance(threading.current_thread(), threading._MainThread):
                    self._watcher.attach_loop(self._local._loop)

    def set_event_loop(self, loop):
        """Set the event loop.
        
        As a side effect, if a child watcher was set before, then calling
        .set_event_loop() from the main thread will call .attach_loop(loop) on
        the child watcher.
        """
        super().set_event_loop(loop)
        if self._watcher is not None:
            if isinstance(threading.current_thread(), threading._MainThread):
                self._watcher.attach_loop(loop)

    def get_child_watcher(self):
        """Get the watcher for child processes.
        
        If not yet set, a SafeChildWatcher object is automatically created.
        """
        if self._watcher is None:
            self._init_watcher()
        return self._watcher

    def set_child_watcher(self, watcher):
        """Set the watcher for child processes."""
        if not watcher is None:
            if not isinstance(watcher, AbstractChildWatcher):
                raise AssertionError
        if self._watcher is not None:
            self._watcher.close()
        self._watcher = watcher


SelectorEventLoop = _UnixSelectorEventLoop
DefaultEventLoopPolicy = _UnixDefaultEventLoopPolicy
